Source code for hysop.numerics.fft.gpyfft_fft

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
FFT iterface for fast Fourier Transforms using CLFFT backend (using gpyfft).
:class:`~hysop.numerics.GpyFFT`
:class:`~hysop.numerics.GpyFFTPlan`
"""

import warnings
import struct
import primefac
import numpy as np
from abc import abstractmethod

from gpyfft.fft import gfft, GFFT
from hysop import __KERNEL_DEBUG__, __TRACE_KERNELS__, __VERBOSE__
from hysop.numerics.fft.fft import (
    HysopFFTDataLayoutError,
    mk_shape,
    float_to_complex_dtype,
    complex_to_float_dtype,
)
from hysop.numerics.fft.opencl_fft import (
    OpenClFFTPlanI,
    OpenClFFTI,
    OpenClArray,
    OpenClArrayBackend,
    OpenClFFTQueue,
)

from hysop import vprint
from hysop.constants import Precision
from hysop.tools.htypes import first_not_None, check_instance
from hysop.tools.units import bytes2str
from hysop.tools.misc import prod
from hysop.tools.warning import HysopWarning
from hysop.tools.htypes import first_not_None
from hysop.tools.numerics import is_complex, is_fp
from hysop.tools.string_utils import framed_str

from hysop.backend.device.opencl import cl, clArray, __OPENCL_PROFILE__
from hysop.backend.device.codegen.base.variables import dtype_to_ctype
from hysop.backend.device.opencl.opencl_kernel_launcher import (
    trace_kernel,
    profile_kernel,
)


[docs] class HysopGpyFftWarning(HysopWarning): pass
# fix a weird bug in clfft/gpyfft keep_plans_ref = []
[docs] class GpyFFTPlan(OpenClFFTPlanI): """ Build a clFFT plan using the gpyfft python interface. Emit warnings when transform output has an unaligned buffer offset. """ DEBUG = False def __new__( cls, cl_env, queue, in_array, out_array, axes, scaling=None, scale_by_size=None, fake_input=None, fake_output=None, callback_kwds=None, direction_forward=True, hardcode_twiddles=False, warn_on_unaligned_output_offset=True, warn_on_allocation=True, error_on_allocation=False, **kwds, ): obj = super().__new__(cls) keep_plans_ref.append(obj) return obj def __init__( self, cl_env, queue, in_array, out_array, axes, scaling=None, scale_by_size=None, fake_input=None, fake_output=None, callback_kwds=None, direction_forward=True, hardcode_twiddles=False, warn_on_unaligned_output_offset=True, warn_on_allocation=True, error_on_allocation=False, **kwds, ): """ Wrap gpyfft.FFT to allow more versatile callback settings and buffer allocations. Parameters ---------- cl_env: OpenClEnvironment OpenCL environment that will provide a context and a default queue. queue: OpenCL queue that will be used by default. in_array: cl.Array or OpenClArray Real input array for this transform. out_array: cl.Array or OpenClArray Real output array for this transform. axes: array_like of ints Axis over witch to compute the transform. scaling: float, optional Force the scaling of the transform. If not given, no scaling is applied (unlike clfft default behaviour). clFFT default scaling for backward transform can be enabled by passing 'DEFAULT' to this parameter. scale_by_size: int, optional, defaults to 1 Extra scaling by an integer: 1.0/S will be applied during the post callback. This is equivalent to setting scaling to 1.0/S but the two parameters are not mutually exclusive. fake_input: DummyArray, optional Fake array from which are computed transform shape and strides. Only used by R2R transforms. fake_output: DummyArray, optional Fake array from which are computed transform shape and strides. Only used by R2R transforms. direction_forward: bool, optional, defaults to True The direction of the transform. True <=> forward transform. hardcode_twiddles: bool, optional, defaults to False Hardcode twiddles as a __constant static array of complex directly in the opencl code. Only used by DCT-II, DCT-III, DST-II and DST-III. If set to False, the twiddles will be computed by the device on the fly, freeing device __constant memory banks. warn_on_unaligned_output_offset: bool, optional, defaults to True Emit a warning if the planner encounter an output array that has a non zero offset. warn_on_allocation: bool, optional, defaults to True Emit a warning if the planner has to allocate opencl buffers. error_on_allocation: bool, optional, defaults to False Raise a RuntimeError if the planner has to allocate opencl buffers. """ super().__init__(**kwds) fake_input = first_not_None(fake_input, in_array) fake_output = first_not_None(fake_output, out_array) callback_kwds = first_not_None(callback_kwds, {}) if queue is None: queue = cl_env.default_queue if queue.context != cl_env.context: msg = "Queue does not match context:" msg += f"\n *Given context is {cl_env.context}." msg += f"\n *Command queue context is {queue.context}." raise ValueError(msg) self.cl_env = cl_env self._queue = queue self.warn_on_unaligned_output_offset = warn_on_unaligned_output_offset self.warn_on_allocation = warn_on_allocation self.error_on_allocation = error_on_allocation self.temp_buffer = None self._setup = False self._baked = False self._allocated = False self.in_array = in_array self.out_array = out_array axes = np.asarray(axes) axes = (axes + in_array.ndim) % in_array.ndim assert in_array.ndim == out_array.ndim assert fake_input.ndim == in_array.ndim assert fake_output.ndim == out_array.ndim assert 0 < axes.size <= in_array.ndim, axes.size scale_by_size = first_not_None(scale_by_size, 1) self._setup_kwds = { "in_array": in_array, "out_array": out_array, "fake_input": fake_input, "fake_output": fake_output, "axes": axes, "scaling": scaling, "scale_by_size": scale_by_size, "direction_forward": direction_forward, "hardcode_twiddles": hardcode_twiddles, "callback_kwds": callback_kwds, }
[docs] def setup(self, queue=None): super().setup(queue=queue) self.setup_plan(**self._setup_kwds) if queue is not None: check_instance(queue, OpenClFFTQueue) self.bake(queue=queue._queue) return self
[docs] def setup_plan( self, in_array, out_array, fake_input, fake_output, axes, direction_forward, scaling, scale_by_size, hardcode_twiddles, callback_kwds, ): # compute strides from fake arrays t_strides_in, t_distance_in, t_batchsize_in, t_shape_in, t_axes_in = ( self.calculate_transform_strides(axes, fake_input) ) t_strides_out, t_distance_out, t_batchsize_out, t_shape_out, t_axes_out = ( self.calculate_transform_strides(axes, fake_output) ) if not np.array_equal(t_axes_in, t_axes_out): msg = "Error finding transform axis (consider setting axes argument)" raise RuntimeError(msg) if not np.array_equal(t_batchsize_in, t_batchsize_out): msg = "Batchsize mismatch: {} vs {}." msg = msg.format(t_batchsize_in, t_batchsize_out) raise RuntimeError(msg) # Enforce no input and output overlap (unless inplace) t_inplace = False if in_array.base_data == out_array.base_data: if in_array.offset < out_array.offset: assert (in_array.offset + in_array.nbytes) < out_array.offset elif in_array.offset > out_array.offset: assert (out_array.offset + out_array.nbytes) < in_array.offset else: # in_array.offset == out_array.offset t_inplace = True # Check data types single_precision_types = (np.float32, np.complex64) double_precision_types = (np.float64, np.complex128) if in_array.dtype in single_precision_types: valid_precision_types = single_precision_types t_precision = gfft.CLFFT_SINGLE h_precision = Precision.FLOAT fp = "float" elif in_array.dtype in double_precision_types: valid_precision_types = double_precision_types t_precision = gfft.CLFFT_DOUBLE h_precision = Precision.DOUBLE fp = "double" else: msg = "Unsupported precision {}." msg = msg.format(in_array.dtype) raise NotImplementedError(msg) for array in (out_array, fake_input, fake_output): if array.dtype not in valid_precision_types: msg = "Incompatible precisions: Got {} but valid precisions are {} " msg += "based on input_array datatype which has been determined to be of kind {}." msg = msg.format(array.dtype, valid_precision_types, h_precision) raise RuntimeError(msg) # Determine transform layout and expected output shape and dtype float_types = (np.float32, np.float64) complex_types = (np.complex64, np.complex128) axe0 = t_axes_in[0] if fake_input.dtype in float_types: layout_in = gfft.CLFFT_REAL layout_out = gfft.CLFFT_HERMITIAN_INTERLEAVED expected_output_shape = mk_shape( fake_input.shape, axe0, fake_input.shape[axe0] // 2 + 1 ) expected_output_dtype = float_to_complex_dtype(fake_input.dtype) t_shape = t_shape_in elif fake_input.dtype in complex_types: if fake_output.dtype in float_types: layout_in = gfft.CLFFT_HERMITIAN_INTERLEAVED layout_out = gfft.CLFFT_REAL expected_output_shape = mk_shape( fake_input.shape, axe0, 2 * (fake_input.shape[axe0] - 1) ) expected_output_dtype = complex_to_float_dtype(fake_input.dtype) t_shape = t_shape_out elif fake_output.dtype in complex_types: layout_in = gfft.CLFFT_COMPLEX_INTERLEAVED layout_out = gfft.CLFFT_COMPLEX_INTERLEAVED expected_output_shape = fake_input.shape expected_output_dtype = fake_input.dtype t_shape = t_shape_in else: msg = "dtype {} is currently not handled." msg = msg.format(fake_output.dtype) raise NotImplementedError(msg) else: msg = "dtype {} is currently not handled." msg = msg.format(fake_input.dtype) raise NotImplementedError(msg) if fake_output.dtype != expected_output_dtype: msg = "Output array dtype {} does not match expected dtype {}." msg = msg.format(fake_output.dtype, expected_output_dtype) raise RuntimeError(msg) if not np.array_equal(fake_output.shape, expected_output_shape): msg = "Output array shape {} does not match expected shape {}." msg = msg.format(fake_output.shape, expected_output_shape) if (layout_in == gfft.CLFFT_HERMITIAN_INTERLEAVED) and ( layout_out == gfft.CLFFT_REAL ): expected_output_shape = mk_shape( fake_input.shape, axe0, 2 * (fake_input.shape[axe0] - 1) + 1 ) if not np.array_equal(fake_output.shape, expected_output_shape): raise RuntimeError(msg) else: raise RuntimeError(msg) if t_inplace and ( (layout_in is gfft.CLFFT_REAL) or (layout_out is gfft.CLFFT_REAL) ): assert (in_array.strides[t_axes_in[0]] == in_array.dtype.itemsize) and ( out_array.strides[t_axes_in[0]] == out_array.dtype.itemsize ), "Inplace real transforms need stride 1 for first transform axis." self.check_transform_shape(t_shape) plan = GFFT.create_plan(self.context, t_shape[::-1]) plan.inplace = t_inplace plan.strides_in = t_strides_in[::-1] plan.strides_out = t_strides_out[::-1] plan.distances = (t_distance_in, t_distance_out) plan.batch_size = t_batchsize_in plan.precision = t_precision plan.layouts = (layout_in, layout_out) if scaling == "DEFAULT": pass elif scaling is not None: plan.scale_forward = scaling plan.scale_backward = scaling else: plan.scale_forward = 1.0 plan.scale_backward = 1.0 # last transformed axis real output array size N = out_array.shape[axes[-1]] typegen = self.cl_env.build_typegen( precision=h_precision, float_dump_mode="dec", use_short_circuit_ops=False, unroll_loops=False, ) (in_data, out_data) = self.set_callbacks( plan=plan, axes=axes, in_array=in_array, out_array=out_array, fake_input=fake_input, fake_output=fake_output, layout_in=layout_in, layout_out=layout_out, N=N, S=scale_by_size, typegen=typegen, fp=fp, hardcode_twiddles=hardcode_twiddles, **callback_kwds, ) self.plan = plan self.in_data = in_data self.out_data = out_data self.is_inplace = t_inplace self.direction_forward = direction_forward if self.DEBUG: def estrides(array): s = array.dtype.itemsize return tuple(x // s for x in array.strides) msg = """ ::CLFFT PLANNER DEBUG:: Input array: shape={}, dtype={}, strides={} elements, base_offset={} Output array: shape={}, dtype={}, strides={} elements, base_offset={} Fake input: shape={}, dtype={}, strides={} elements Fake output: shape={}, dtype={}, strides={} elements Array configuration: t_distance_in: {} t_distance_out: {} t_axes_in: {} t_axes_out: {} t_batch_size_in: {} t_batch_size_out: {} t_shape_in: {} t_shape_out: {} t_strides_in: {} t_strides_out: {} Plan configuration: inplace: {} precision: {} layouts: (in={}, out={}) shape: {} strides_in: {} strides_out: {} batch_size: {} distances: (in={}, out={}) scale_forward: {} scale_backward: {} Pre callback source code: {} Post callback source code: {} """.format( in_array.shape, in_array.dtype, estrides(in_array), in_array.offset, out_array.shape, out_array.dtype, estrides(out_array), out_array.offset, fake_input.shape, fake_input.dtype, estrides(fake_input), fake_output.shape, fake_output.dtype, estrides(fake_output), t_distance_in, t_distance_out, t_axes_in, t_axes_out, t_batchsize_in, t_batchsize_out, t_shape_in, t_shape_out, t_strides_in, t_strides_out, plan.inplace, None, plan.layouts[0], plan.layouts[1], plan.shape, plan.strides_in, plan.strides_out, plan.batch_size, plan.distances[0], plan.distances[1], plan.scale_forward, plan.scale_backward, self.pre_callback_src.decode(), self.post_callback_src.decode(), ) print(msg) if scaling == "DEFAULT": pass elif scaling is not None: plan.scale_forward = scaling plan.scale_backward = scaling else: plan.scale_forward = 1.0 plan.scale_backward = 1.0 # profiling info is delegated to this class, inform the KernelListLauncher self._show_profiling_info = False # custom apply msg self._apply_msg_template = " fft_{}2{}_{}_{}_{{}}<<<>>>".format( "C" if is_complex(in_array) else "R", "C" if is_complex(out_array) else "R", "forward" if direction_forward else "backward", self.__class__.__name__.replace("Gpy", "") .replace("Plan", "_plan") .replace("FFT", "DFT"), )
[docs] def set_callbacks( self, plan, axes, N, in_array, out_array, fake_input, fake_output, layout_in, layout_out, **kwds, ): """Set plan pre and post callbacks and return in and out array data opencl buffers.""" (in_data, in_fp, oip) = self.compute_input_array_offset( in_array, fake_input, axes ) if (layout_in == gfft.CLFFT_HERMITIAN_INTERLEAVED) and ( layout_out == gfft.CLFFT_REAL ): # ******************************************************************************** # CLFFT C2R BUGFIX # Force the zero and the Nyquist frequency of the input to be purely real. (pre_src, user_data) = self.pre_offset_callback_C2R( offset_input_pointer=oip, in_fp=in_fp, N=N, **kwds ) # ******************************************************************************** else: (pre_src, user_data) = self.pre_offset_callback( offset_input_pointer=oip, in_fp=in_fp, N=N, **kwds ) (out_data, out_fp, oop) = self.compute_output_array_offset( out_array, fake_output, axes ) (post_src, user_data) = self.post_offset_callback( offset_output_pointer=oop, out_fp=out_fp, N=N, **kwds ) # *********************************************************************************** # GPYFFT BUGFIX # Keep a reference to callback source code to prevent dangling const char* pointers. # Do not remove because clfft only get the pointer and gpyfft does not increase the # refcount of those strings, resulting in random code injection into the fft kernels. pre_src = pre_src.encode("utf-8") post_src = post_src.encode("utf-8") self.pre_callback_src = pre_src self.post_callback_src = post_src # *********************************************************************************** if pre_src is not None: plan.set_callback(b"pre_callback", pre_src, "pre", user_data=user_data) if post_src is not None: plan.set_callback(b"post_callback", post_src, "post", user_data=user_data) return (in_data, out_data)
[docs] @classmethod def check_transform_shape(self, shape): """Check that clFFT can handle the logical transform size.""" valid_factors = {2, 3, 5, 7, 11, 13} for Ni in shape: factors = tuple(primefac.primefac(int(Ni))) invalid_factors = set(factors) - valid_factors if invalid_factors: factorization = " * ".join( f"{factor}^{factors.count(factor)}" for factor in set(factors) ) candidates = ", ".join(str(vf) for vf in valid_factors) msg = "\nInvalid transform shape {} for clFFT:" msg += "\n {} = {}" msg += "\nOnly {} prime factors are available." msg += "\n" msg = msg.format(shape, Ni, factorization, candidates) raise ValueError(msg)
[docs] @classmethod def calculate_transform_strides(cls, taxes, array): """Redefine gpyfft.FFT.calculate_transform_strides""" shape = np.asarray(array.shape, dtype=np.uint32) strides = np.asarray(array.strides, dtype=np.uint32) dtype = array.dtype # array dimension and transform dimension ndim = len(shape) tdim = len(taxes) assert tdim <= ndim # transform axes and batch axes taxes[taxes < 0] += ndim baxes = np.asarray( tuple(a for a in range(ndim) if (a not in taxes)), dtype=np.uint32 ) # sort untransformed axes by strides. baxes = baxes[np.argsort(strides[baxes][::-1])][::-1] # compute a list of collapsable axes: [ [x,y], [z] ] cal = [] # collaspsable axes list cac = baxes[:1].tolist() # collaspsable axes candidates for a in baxes[1:]: if strides[a] == (strides[cac[-1]] * shape[cac[-1]]): cac.append(a) else: cal.append(cac) cac = [a] cal.append(cac) msg = "Data layout not supported (only single non-transformed axis allowed)" if len(cal) != 1: raise HysopFFTDataLayoutError(msg) baxes = cal[0] t_distances = strides[baxes] // dtype.itemsize if len(t_distances) == 0: t_distance = 0 else: t_distance = t_distances[0] batchsize = np.prod(shape[baxes]) t_shape = shape[taxes] t_strides = strides[taxes] // dtype.itemsize return (tuple(t_strides), t_distance, batchsize, tuple(t_shape), tuple(taxes))
[docs] @classmethod def compute_input_array_offset( cls, real_input, fake_input, axes, transform_offset="K", idx="k{}", batch_id="b", void_ptr="input", casted_ptr="in", ): new_input = cls.extract_array(real_input) input_offset = cls.get_array_offset(new_input, emit_warning=False) input_data = new_input.base_data input_fp = dtype_to_ctype(new_input.dtype) offset_input_pointer = cls.compute_pointer_offset( real_array=real_input, fake_array=fake_input, axes=axes, base_offset=input_offset, transform_offset=transform_offset, idx=idx, batch_id=batch_id, fp="const " + input_fp, void_ptr=void_ptr, casted_ptr=casted_ptr, is_input=True, ) return (input_data, input_fp, offset_input_pointer)
[docs] @classmethod def compute_output_array_offset( cls, real_output, fake_output, axes, transform_offset="K", idx="k{}", batch_id="b", void_ptr="output", casted_ptr="out", ): new_output = cls.extract_array(real_output) output_offset = cls.get_array_offset(new_output, emit_warning=True) output_data = new_output.base_data output_fp = dtype_to_ctype(new_output.dtype) offset_output_pointer = cls.compute_pointer_offset( real_array=real_output, fake_array=fake_output, axes=axes, base_offset=output_offset, transform_offset=transform_offset, idx=idx, batch_id=batch_id, fp=output_fp, void_ptr=void_ptr, casted_ptr=casted_ptr, is_input=False, ) return (output_data, output_fp, offset_output_pointer)
[docs] @classmethod def compute_pointer_offset( cls, real_array, fake_array, axes, base_offset, transform_offset, idx, batch_id, fp, void_ptr, casted_ptr, is_input, ): fake_strides, fake_distance, fake_batchsize, fake_shape, fake_axes = ( cls.calculate_transform_strides(axes, fake_array) ) assert len(fake_shape) == len(fake_strides) <= 3 ndim = len(fake_shape) K = transform_offset k = idx b = batch_id vptr = void_ptr cptr = casted_ptr D = fake_distance S = fake_strides[::-1] oip = () oip += (f"uint {K} = offset;",) if fake_batchsize > 1: oip += (f"const uint b = {K}/{D};",) # FIX ANOTHER CLFFT BUG (wrong batch size scheduling...) if is_input: oip += (f"if (b>={fake_batchsize}) {{ return ({fp})(NAN); }};",) else: oip += (f"if (b>={fake_batchsize}) {{ return ; }};",) oip += (f"{K} -= {b}*{D};",) for i in range(ndim - 1, -1, -1): Ki = idx.format("xyz"[i]) Si = S[i] oip += (f"const uint {Ki} = {K}/{Si};",) if i > 0: oip += (f"{K} -= {Ki}*{Si};",) if real_array is fake_array: offset = "{base_offset} + offset - {k}".format( base_offset=base_offset, K=K, k=k.format("x") ) else: real_strides, real_distance, real_batchsize, real_shape, real_axes = ( cls.calculate_transform_strides(axes, real_array) ) assert fake_batchsize == real_batchsize assert np.array_equal(fake_axes, real_axes) assert len(real_shape) == len(real_strides) == ndim real_offset = (f"{base_offset}uL",) if fake_batchsize > 1: real_offset += (f"{b}*{real_distance}",) for i in range(ndim - 1, 0, -1): Ki = idx.format("xyz"[i]) Si = real_strides[i] real_offset += ("{Ki}*{Si}".format(Ki, Si),) offset = " + ".join(real_offset) oip += ( "__global {fp}* {cptr} = (__global {fp}*)({vptr}) + {offset};".format( cptr=cptr, vptr=vptr, fp=fp, offset=offset ), ) indent = " " * 12 offset_pointer = indent + f"\n{indent}".join(oip) return offset_pointer
[docs] @classmethod def extract_array(cls, array): offset = array.offset // array.dtype.itemsize alignment = array.offset % array.dtype.itemsize assert alignment == 0, "Wrong array alignment." try: # if offset is aligned on device memory boundary we may be # able to create a subbuffer, else we keep original array. data = array.data new_array = array[offset:] except: new_array = array return new_array
[docs] @classmethod def get_array_offset(cls, array, emit_warning): """ Get array offset in terms of array elements, and emit a warning is offset is non zero and if emit_warning is set. """ dtype = array.dtype if (array.offset % dtype.itemsize) != 0: msg = "Unaligned array offset." raise RuntimeError(msg) base_offset = array.offset // dtype.itemsize if emit_warning and (base_offset != 0): msg = "OpenCl array offset is not zero and will be injected into a clFFT pre or " msg += "post callback. This could entail bad results if this buffer is used as " msg += "an output: the beginning of this buffer may be used as a temporary " msg += "buffer during the transform before actual results are stored at the right " msg += "offset through the callback." warnings.warn(msg, HysopGpyFftWarning) return base_offset
[docs] def bake(self, queue=None): """Bake the plan.""" if self._baked: msg = "Plan was already baked." raise RuntimeError(msg) def fmt_arg(name): return self._setup_kwds[name] def fmt_array(name): arr = fmt_arg(name) return "shape={:<16} strides={:<16} dtype={:<16}".format( str(arr.shape) + ",", str(arr.strides) + ",", str(arr.dtype) ) title = f" Baking {self.__class__.__name__} " msg = """ in_array: {} out_array: {} fake_input: {} fake_output: {} axes: {} direction_forward: {} hardcode twiddles: {}""".format( fmt_array("in_array"), fmt_array("out_array"), fmt_array("fake_input"), fmt_array("fake_output"), fmt_arg("axes"), fmt_arg("direction_forward"), fmt_arg("hardcode_twiddles"), ) if self.verbose: print() print(framed_str(title, msg, c="*")) queue = first_not_None(queue, self.queue) self.plan.bake(queue) self._baked = True return self
[docs] def allocate(self, buf=None): """Allocate plan extra memory, possibly with a custom buffer.""" if self._allocated: msg = "Plan was already allocated." raise RuntimeError(msg) size = self.plan.temp_array_size if size > 0: if buf is None: if self.warn_on_allocation or self.error_on_allocation: msg = "Allocating temporary buffer of size {} for clFFT::{}." msg = msg.format(bytes2str(size), id(self)) if self.error_on_allocation: raise RuntimeError(msg) else: warnings.warn(msg, HysopGpyFftWarning) buf = cl.Buffer(self.context, cl.mem_flags.READ_WRITE, size=size) self.temp_buffer = buf elif buf.size != size: msg = "Buffer does not match required size: {} != {}" msg = msg.format(buf.size, size) raise ValueError(msg) else: self.temp_buffer = buf.data else: assert buf is None self._allocated = True return self
[docs] def profile(self, events): for i, evt in enumerate(events): profile_kernel(None, evt, self._apply_msg_template.format(i)) evt.wait() return evt
[docs] def enqueue(self, queue=None, wait_for=None): """ Enqueue transform with array base_data. """ queue = first_not_None(queue, self.queue) if not self._baked: self.bake(queue=queue) if not self._allocated: self.allocate() in_data, out_data = self.in_data, self.out_data direction_forward = self.direction_forward trace_kernel(self._apply_msg_template.format("kernels")) if self.is_inplace: events = self.plan.enqueue_transform( (queue,), (in_data,), direction_forward=direction_forward, temp_buffer=self.temp_buffer, wait_for_events=wait_for, ) else: # print(self.in_array) # print(self.out_array) events = self.plan.enqueue_transform( (queue,), (in_data,), (out_data), direction_forward=direction_forward, temp_buffer=self.temp_buffer, wait_for_events=wait_for, ) evt = self.profile(events) return evt
[docs] def enqueue_arrays(self, *args, **kwds): msg = "Enqueue arrays is not supported yet." raise NotImplementedError(msg)
[docs] def execute(self, **kwds): return self.enqueue(**kwds)
@property def required_buffer_size(self): assert self._baked, "plan has not been baked yet." return self.plan.temp_array_size @property def ready(self): return self._baked and self._allocated @property def queue(self): return self._queue @property def context(self): return self.cl_env.context @property def input_array(self): return self.in_array @property def output_array(self): return self.out_array
[docs] def pre_offset_callback(self, offset_input_pointer, in_fp, **kwds): """Default pre_offset_callback, just inject input array offset.""" callback = """{fp} pre_callback(const __global void* input, const uint offset, __global void* userdata) {{ {offset_input_pointer} return in[kx]; }}""".format( fp=in_fp, offset_input_pointer=offset_input_pointer ) return callback, None
[docs] def pre_offset_callback_C2R(self, offset_input_pointer, fp, N, **kwds): """ C2R specific pre_offset_callback, inject input array offset and force the nyquist frequency to be purely real (fixes a bug in clfft for even C2R transform of dimension > 1). """ force_real_input = "(kx==0)" if N % 2 == 0: # Nyquist freq force_real_input += f"|| (kx=={N//2})" callback = """{fp}2 pre_callback(const __global void* input, const uint offset, __global void* userdata) {{ {offset_input_pointer} if ({force_real_input}) {{ return ({fp}2)(in[kx].x, 0); }} else {{ return in[kx]; }} }}""".format( fp=fp, force_real_input=force_real_input, offset_input_pointer=offset_input_pointer, ) return callback, None
[docs] def post_offset_callback(self, offset_output_pointer, out_fp, S, **kwds): """ Default post_offset_callback, just inject output array offset and scale by size (divide by some integer, which will often be the logical size of the transform or 1). """ callback = """void post_callback(__global void* output, const uint offset, __global void* userdata, const {fp} R) {{ {offset_output_pointer} out[kx] = R / {S}; }}""".format( fp=out_fp, offset_output_pointer=offset_output_pointer, S=S ) return callback, None
[docs] @classmethod def fake_array(cls, shape, dtype, strides=None): """ Create a fake_array of given shape and dtype. If not given, the strides are computed from shape and dtype as if the array would be contiguous in memory. """ class DummyArray: def __init__(self, shape, strides, dtype): assert shape is not None assert dtype is not None dtype = np.dtype(dtype) if strides is None: strides = self.compute_strides(shape, dtype) assert len(shape) == len(strides) self.shape = shape self.strides = strides self.dtype = dtype self.ndim = len(shape) @classmethod def compute_strides(cls, shape, dtype): strides = list(shape)[::-1] strides[1:] = np.cumprod(strides[:-1]) strides[0] = 1 strides = tuple(x * dtype.itemsize for x in strides) return strides[::-1] array = DummyArray(shape=shape, strides=strides, dtype=dtype) return array
[docs] @classmethod def generate_twiddles( cls, name, base, count, typegen, fp, hardcode_twiddles, idx="kx", Tvar="T" ): """ Generate twiddles as a string. OpenCl __constant static array: exp(base*k0) for k in 0..count """ if hardcode_twiddles: k0 = np.arange(count) E = np.exp(1.0j * base * k0, dtype=np.complex128) base = "\t\t({fp}2)({}, {})" vals = ",\n".join( base.format(typegen.dump(x.real), typegen.dump(x.imag), fp=fp) for x in E ) twiddles = """ __constant static const {fp}2 {name}[{N}] = {{ {vals} }}; """.format( fp=fp, name=name, vals=vals, N=count ) twiddle = "const {fp}2 {Tvar} = {name}[{idx}];" twiddle = twiddle.format(fp=fp, Tvar=Tvar, name=name, idx=idx) else: twiddle = ( "const {fp}2 {Tvar} = ({fp}2)(cos({base}*{idx}), sin({base}*{idx}));" ) twiddle = twiddle.format(fp=fp, Tvar=Tvar, idx=idx, base=typegen.dump(base)) twiddles = "" return (twiddle, twiddles)
[docs] class GpyR2RPlan(GpyFFTPlan): """ Specialization for real to real transforms built from r2c or c2r transforms. Real to real transforms use fake arrays as input and output along with custom pre and post processing callbacks. """ def __init__( self, in_array, out_array, fake_input, fake_output, scale_by_size, axes, **kwds ): """ Handmade R2R transforms rely on fake input and output that will never really be read or written. This is necessary because clFFT do not handle R2R transforms and we use pre and post processing to compute an equivalent R2C or C2R problem. Fake arrays are used to compute transform size, batch size and strides. Real arrays pointer are passed to the kernels and pre and post callbacks map the input and output data from those real arrays, adjusting the stride computations to the real array sizes from the fake array indices. """ real_types = (np.float32, np.float64) msg = f"Incompatible shapes {in_array.shape} vs {out_array.shape}." assert np.array_equal(in_array.shape, out_array.shape), msg msg = f"Incompatible dtypes {in_array.dtype} vs {out_array.dtype}." assert in_array.dtype == out_array.dtype, msg msg = f"Incompatible dtype {in_array.dtype}, expected {real_types}." assert in_array.dtype in real_types, msg msg = "Fake input has not been set." assert fake_input is not None, msg msg = "Fake output has not been set." assert fake_output is not None, msg axis = self.check_r2r_axes(in_array, axes) axes = np.asarray([axis]) super().__init__( in_array=in_array, out_array=out_array, fake_input=fake_input, fake_output=fake_output, axes=axes, scale_by_size=scale_by_size, **kwds, )
[docs] def setup_plan(self, **kwds): super().setup_plan(**kwds) if self.is_inplace: msg = "R2R transforms cannot be compute inplace on this backend." raise NotImplementedError(msg)
[docs] @classmethod def prepare_r2r(cls, in_array, axes): """Return all the required variables to build fake arrays for a all R2R transforms.""" axis = cls.check_r2r_axes(in_array, axes) shape = in_array.shape N = shape[axis] dtype = in_array.dtype ctype = float_to_complex_dtype(dtype) return (dtype, ctype, shape, axis, N)
[docs] @classmethod def check_r2r_axes(cls, in_array, axes): """Check that only the last axis is transformed.""" axis = in_array.ndim - 1 assert len(axes) == 1 assert axes[0] in (-1, axis) return axis
[docs] class GpyDCTIPlan(GpyR2RPlan): def __init__(self, in_array, axes, **kwds): # print('\ncreate DCT-I plan {}'.format(id(self))) (dtype, ctype, shape, axis, N) = self.prepare_r2r(in_array, axes) rshape = mk_shape(shape, axis, 2 * N - 2) cshape = mk_shape(shape, axis, N) fake_input = self.fake_array(shape=rshape, dtype=dtype) fake_output = self.fake_array(shape=cshape, dtype=ctype) super().__init__( in_array=in_array, axes=axes, fake_input=fake_input, fake_output=fake_output, **kwds, ) # def __del__(self): # print('\ndelete DCT-I plan {}'.format(id(self)))
[docs] def pre_offset_callback(self, N, fp, offset_input_pointer, **kwds): pre = """{fp} pre_callback(const __global void* input, const uint offset, __global void* userdata) {{ {offset_input_pointer} {fp} ret; if (kx<{N}) {{ ret = in[kx]; }} else {{ ret = in[2*{N}-kx-2]; }} return ret; }}""".format( N=N, fp=fp, offset_input_pointer=offset_input_pointer ) return pre, None
[docs] def post_offset_callback(self, fp, S, offset_output_pointer, **kwds): post = """void post_callback(__global void* output, const uint offset, __global void* userdata, const {fp}2 R) {{ {offset_output_pointer} out[kx] = R.x/{S}; }}""".format( S=S, fp=fp, offset_output_pointer=offset_output_pointer ) return post, None
[docs] class GpyDCTIIPlan(GpyR2RPlan): def __init__(self, in_array, axes, **kwds): (dtype, ctype, shape, axis, N) = self.prepare_r2r(in_array, axes) rshape = mk_shape(shape, axis, N) cshape = mk_shape(shape, axis, N // 2 + 1) fake_input = self.fake_array(shape=rshape, dtype=dtype) fake_output = self.fake_array(shape=cshape, dtype=ctype) super().__init__( in_array=in_array, axes=axes, fake_input=fake_input, fake_output=fake_output, **kwds, )
[docs] def pre_offset_callback(self, N, fp, offset_input_pointer, **kwds): n = (N - 1) // 2 + 1 pre = """ {fp} pre_callback(const __global void* input, uint offset, __global void* userdata) {{ {offset_input_pointer} {fp} ret; if (kx<{n}) {{ ret = in[2*kx]; }} else {{ ret = in[2*({N}-kx)-1]; }} return ret; }}""".format( n=n, N=N, fp=fp, offset_input_pointer=offset_input_pointer ) return pre, None
[docs] def post_offset_callback( self, N, S, fp, offset_output_pointer, typegen, hardcode_twiddles, **kwds ): n = (N - 1) // 2 + 1 (twiddle, twiddles) = self.generate_twiddles( "dct2_twiddles", base=-np.pi / (2 * N), count=N // 2 + 1, fp=fp, typegen=typegen, hardcode_twiddles=hardcode_twiddles, ) post = """ {twiddles} void post_callback(__global void* output, const uint offset, __global void* userdata, const {fp}2 R) {{ {offset_output_pointer} {twiddle} if (kx < {n}) {{ out[kx] = +2*(R.x*T.x - R.y*T.y)/{S}; }} if (kx > 0) {{ out[{N}-kx] = -2*(R.x*T.y + R.y*T.x)/{S}; }} }}""".format( N=N, S=S, n=n, fp=fp, twiddle=twiddle, twiddles=twiddles, offset_output_pointer=offset_output_pointer, ) return post, None
[docs] class GpyDCTIIIPlan(GpyR2RPlan): def __init__(self, in_array, axes, **kwds): (dtype, ctype, shape, axis, N) = self.prepare_r2r(in_array, axes) rshape = mk_shape(shape, axis, N) cshape = mk_shape(shape, axis, N // 2 + 1) fake_input = self.fake_array(shape=cshape, dtype=ctype) fake_output = self.fake_array(shape=rshape, dtype=dtype) super().__init__( in_array=in_array, axes=axes, fake_input=fake_input, fake_output=fake_output, **kwds, )
[docs] def pre_offset_callback(self, **kwds): msg = "pre_offset_callback_C2R should be used instead." raise NotImplementedError(msg)
[docs] def pre_offset_callback_C2R( self, N, S, fp, typegen, offset_input_pointer, hardcode_twiddles, **kwds ): (twiddle, twiddles) = self.generate_twiddles( "dct3_twiddles", base=+np.pi / (2 * N), count=N // 2 + 1, fp=fp, typegen=typegen, hardcode_twiddles=hardcode_twiddles, ) force_real_input = "(kx==0)" if N % 2 == 0: # Nyquist freq force_real_input += f"|| (kx=={N//2})" pre = """ {twiddles} {fp}2 pre_callback(const __global void* input, const uint offset, __global void* userdata) {{ {offset_input_pointer} {twiddle} {fp}2 C, R; R.x = in[kx]; if (kx==0) {{ R.y = 0; }} else {{ R.y = -in[{N}-kx]; }} C.x = R.x*T.x - R.y*T.y; if ({force_real_input}) {{ C.y = 0; }} else {{ C.y = R.x*T.y + R.y*T.x; }} return C; }}""".format( N=N, fp=fp, offset_input_pointer=offset_input_pointer, twiddle=twiddle, twiddles=twiddles, force_real_input=force_real_input, ) return pre, None
[docs] def post_offset_callback(self, N, S, fp, offset_output_pointer, **kwds): n = (N - 1) // 2 + 1 post = """ void post_callback(__global void* output, const uint offset, __global void* userdata, const {fp} R) {{ {offset_output_pointer} if (kx < {n}) {{ out[2*kx] = R/{S}; }} else {{ out[2*({N}-kx)-1] = R/{S}; }} }}""".format( N=N, S=S, n=n, fp=fp, offset_output_pointer=offset_output_pointer ) return post, None
[docs] class GpyDSTIPlan(GpyR2RPlan): def __init__(self, in_array, axes, **kwds): (dtype, ctype, shape, axis, N) = self.prepare_r2r(in_array, axes) rshape = mk_shape(shape, axis, 2 * N + 2) cshape = mk_shape(shape, axis, N + 2) fake_input = self.fake_array(shape=rshape, dtype=dtype) fake_output = self.fake_array(shape=cshape, dtype=ctype) super().__init__( in_array=in_array, axes=axes, fake_input=fake_input, fake_output=fake_output, **kwds, )
[docs] def pre_offset_callback(self, N, fp, offset_input_pointer, **kwds): pre = """{fp} pre_callback(const __global void* input, const uint offset, __global void* userdata) {{ {offset_input_pointer} {fp} ret; if ((kx==0) || (kx=={N}+1)) {{ ret = 0; }} else if (kx<{N}+1) {{ ret = -in[kx-1]; }} else {{ ret = +in[2*{N}+1-kx]; }} return ret; }}""".format( N=N, fp=fp, offset_input_pointer=offset_input_pointer ) return pre, None
[docs] def post_offset_callback(self, fp, N, S, offset_output_pointer, **kwds): post = """void post_callback(__global void* output, const uint offset, __global void* userdata, const {fp}2 R) {{ {offset_output_pointer} if ((kx!=0) && (kx!={N}+1)) {{ out[kx-1] = R.y/{S}; }} }}""".format( N=N, S=S, fp=fp, offset_output_pointer=offset_output_pointer ) return post, None
[docs] class GpyDSTIIPlan(GpyR2RPlan): def __init__(self, in_array, axes, **kwds): (dtype, ctype, shape, axis, N) = self.prepare_r2r(in_array, axes) rshape = mk_shape(shape, axis, N) cshape = mk_shape(shape, axis, N // 2 + 1) fake_input = self.fake_array(shape=rshape, dtype=dtype) fake_output = self.fake_array(shape=cshape, dtype=ctype) super().__init__( in_array=in_array, axes=axes, fake_input=fake_input, fake_output=fake_output, **kwds, )
[docs] def pre_offset_callback(self, N, fp, offset_input_pointer, **kwds): n = (N - 1) // 2 + 1 pre = """ {fp} pre_callback(const __global void* input, uint offset, __global void* userdata) {{ {offset_input_pointer} {fp} ret; if (kx<{n}) {{ ret = +in[2*kx]; }} else {{ ret = -in[2*({N}-kx)-1]; }} return ret; }}""".format( n=n, N=N, fp=fp, offset_input_pointer=offset_input_pointer ) return pre, None
[docs] def post_offset_callback( self, N, S, fp, offset_output_pointer, typegen, hardcode_twiddles, **kwds ): n = (N - 1) // 2 + 1 (twiddle, twiddles) = self.generate_twiddles( "dst2_twiddles", base=-np.pi / (2 * N), count=N // 2 + 1, fp=fp, typegen=typegen, hardcode_twiddles=hardcode_twiddles, ) post = """ {twiddles} void post_callback(__global void* output, const uint offset, __global void* userdata, const {fp}2 R) {{ {offset_output_pointer} {twiddle} if (kx > 0) {{ out[kx-1] = -2*(R.x*T.y + R.y*T.x)/{S}; }} if (kx < {n}) {{ out[{N}-kx-1] = +2*(R.x*T.x - R.y*T.y)/{S}; }} }}""".format( N=N, S=S, n=n, fp=fp, twiddle=twiddle, twiddles=twiddles, offset_output_pointer=offset_output_pointer, ) return post, None
[docs] class GpyDSTIIIPlan(GpyR2RPlan): def __init__(self, in_array, axes, **kwds): (dtype, ctype, shape, axis, N) = self.prepare_r2r(in_array, axes) rshape = mk_shape(shape, axis, N) cshape = mk_shape(shape, axis, N // 2 + 1) fake_input = self.fake_array(shape=cshape, dtype=ctype) fake_output = self.fake_array(shape=rshape, dtype=dtype) super().__init__( in_array=in_array, axes=axes, fake_input=fake_input, fake_output=fake_output, **kwds, )
[docs] def pre_offset_callback(self, **kwds): msg = "pre_offset_callback_C2R should be used instead." raise NotImplementedError(msg)
[docs] def pre_offset_callback_C2R( self, N, S, fp, typegen, offset_input_pointer, hardcode_twiddles, **kwds ): (twiddle, twiddles) = self.generate_twiddles( "dst3_twiddles", base=+np.pi / (2 * N), count=N // 2 + 1, fp=fp, typegen=typegen, hardcode_twiddles=hardcode_twiddles, ) force_real_input = "(kx==0)" if N % 2 == 0: # Nyquist freq force_real_input += f"|| (kx=={N//2})" pre = """ {twiddles} {fp}2 pre_callback(const __global void* input, const uint offset, __global void* userdata) {{ {offset_input_pointer} {twiddle} {fp}2 C, R; R.x = in[{N}-kx-1]; if (kx==0) {{ R.y = 0; }} else {{ R.y = -in[kx-1]; }} C.x = R.x*T.x - R.y*T.y; if ({force_real_input}) {{ C.y = 0; }} else {{ C.y = R.x*T.y + R.y*T.x; }} return C; }}""".format( N=N, fp=fp, offset_input_pointer=offset_input_pointer, twiddle=twiddle, twiddles=twiddles, force_real_input=force_real_input, ) return pre, None
[docs] def post_offset_callback(self, N, S, fp, offset_output_pointer, **kwds): n = (N - 1) // 2 + 1 post = """ void post_callback(__global void* output, const uint offset, __global void* userdata, const {fp} R) {{ {offset_output_pointer} if (kx < {n}) {{ out[2*kx] = +R/{S}; }} else {{ out[2*({N}-kx)-1] = -R/{S}; }} }}""".format( N=N, S=S, n=n, fp=fp, offset_output_pointer=offset_output_pointer ) return post, None
[docs] class GpyFFT(OpenClFFTI): """ Interface to compute local to process FFT-like transforms using the clFFT backend through the gpyfft python interface. clFFT backend has many advantages: - single and double precision supported - no intermediate temporary buffers created at each call. - all required temporary buffers can be supplied or are auto-allocated only once. - real planning capability (but no explicit caching capabilities) - injection of custom opencl code for pre and post processing. It also has some disadvantages: - Bad OpenCL CPU devices support - The library is to greedy with temporary buffers for big transforms. Planning may destroy initial arrays content. Executing a plan may result in unwanted writes to output data, see notes. Note that custom code injection is not available for all transforms: 1) All real to real transforms are implemented using pre and post processing capabilities. 2) Pre and post processing is used to inject array base offsets. User should take care of extending previously defined pre and post processing opencl code. Notes ----- Output array is used during transform and if out.data is not aligned on device.MEM_BASE_ADDR_ALIGN the begining of the buffer may be overwritten by intermediate transform results. out.data = out.base_data + out.offset if (offset%alignment > 0) out.base_data[0:out.size] may be trashed during computation and the result of the transform will go to out.base_data[out.offset:out.offset+out.size] Thus for every transforms out.base_data[0:min(out.offset,out.size)] may be overwritten with trash data. The default behaviour is to emmit a warning when output data is not aligned on device memory boundary. """ def __init__( self, cl_env, backend=None, allocator=None, warn_on_allocation=True, warn_on_unaligned_output_offset=True, error_on_allocation=False, **kwds, ): super().__init__( cl_env=cl_env, backend=backend, allocator=allocator, warn_on_allocation=warn_on_allocation, error_on_allocation=error_on_allocation, **kwds, ) self.supported_ftypes = (np.float32, np.float64) self.supported_ctypes = (np.complex64, np.complex128) self.supported_cosine_transforms = (1, 2, 3) self.supported_sine_transforms = (1, 2, 3) self.warn_on_unaligned_output_offset = warn_on_unaligned_output_offset
[docs] def allocate_output(self, out, shape, dtype): """Alocate output if required and check shape and dtype.""" if out is None: if self.warn_on_allocation or self.error_on_allocation: nbytes = prod(shape) * dtype.itemsize msg = "GpyFFT: allocating output array of size {}." msg = msg.format(bytes2str(nbytes)) if self.error_on_allocation: raise RuntimeError(msg) else: warnings.warn(msg, HysopGpyFftWarning) out = self.backend.empty(shape=shape, dtype=dtype) else: assert out.dtype == dtype assert out.shape == shape return out
[docs] def bake_kwds(self, **kwds): plan_kwds = {} plan_kwds["in_array"] = kwds.pop("a") plan_kwds["out_array"] = kwds.pop("out") plan_kwds["scaling"] = kwds.pop("scaling", None) plan_kwds["scale_by_size"] = kwds.pop("scale_by_size", None) plan_kwds["axes"] = kwds.pop("axes", (kwds.pop("axis"),)) plan_kwds["cl_env"] = kwds.pop("cl_env", self.cl_env) plan_kwds["queue"] = kwds.pop("queue", self.queue) plan_kwds["verbose"] = kwds.pop("verbose", __VERBOSE__) plan_kwds["warn_on_allocation"] = kwds.pop( "warn_on_allocation", self.warn_on_allocation ) plan_kwds["error_on_allocation"] = kwds.pop( "error_on_allocation", self.error_on_allocation ) plan_kwds["warn_on_unaligned_output_offset"] = kwds.pop( "warn_on_unaligned_output_offset", self.warn_on_unaligned_output_offset ) if kwds: msg = "Unknown keyword arguments: {}" msg = msg.format(", ".join(f"'{kwd}'" for kwd in kwds.keys())) raise RuntimeError(msg) return plan_kwds
[docs] def fft(self, a, out=None, axis=-1, **kwds): (shape, dtype) = super().fft(a=a, out=out, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) kwds = self.bake_kwds(a=a, out=out, axis=axis, **kwds) plan = GpyFFTPlan(**kwds) return plan
[docs] def ifft(self, a, out=None, axis=-1, **kwds): (shape, dtype, s) = super().ifft(a=a, out=out, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) kwds = self.bake_kwds(a=a, out=out, axis=axis, scaling="DEFAULT", **kwds) plan = GpyFFTPlan(direction_forward=False, **kwds) return plan
[docs] def rfft(self, a, out=None, axis=-1, **kwds): (shape, dtype) = super().rfft(a=a, out=out, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) kwds = self.bake_kwds(a=a, out=out, axis=axis, **kwds) plan = GpyFFTPlan(**kwds) return plan
[docs] def irfft(self, a, out=None, n=None, axis=-1, **kwds): (shape, dtype, s) = super().irfft(a=a, out=out, axis=axis, n=n, **kwds) out = self.allocate_output(out, shape, dtype) kwds = self.bake_kwds(a=a, out=out, axis=axis, scale_by_size=s, **kwds) plan = GpyFFTPlan(**kwds) return plan
[docs] def dct(self, a, out=None, type=2, axis=-1, **kwds): (shape, dtype) = super().dct(a=a, out=out, type=type, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) kwds = self.bake_kwds(a=a, out=out, axis=axis, **kwds) if type == 1: plan = GpyDCTIPlan(**kwds) elif type == 2: plan = GpyDCTIIPlan(**kwds) elif type == 3: plan = GpyDCTIIIPlan(**kwds) else: msg = f"Unimplemented cosine transform type {type}" raise RuntimeError(msg) return plan
[docs] def dst(self, a, out=None, type=2, axis=-1, **kwds): (shape, dtype) = super().dst(a=a, out=out, type=type, axis=axis, **kwds) out = self.allocate_output(out, shape, dtype) kwds = self.bake_kwds(a=a, out=out, axis=axis, **kwds) if type == 1: plan = GpyDSTIPlan(**kwds) elif type == 2: plan = GpyDSTIIPlan(**kwds) elif type == 3: plan = GpyDSTIIIPlan(**kwds) else: msg = f"Unimplemented sine transform type {type}" raise RuntimeError(msg) return plan
[docs] def idct(self, a, out=None, type=2, axis=-1, **kwds): (shape, dtype, itype, s) = super().idct( a=a, out=out, type=type, axis=axis, **kwds ) return self.dct(a=a, out=out, type=itype, axis=axis, scale_by_size=s, **kwds)
[docs] def idst(self, a, out=None, type=2, axis=-1, **kwds): (shape, dtype, itype, s) = super().idst( a=a, out=out, type=type, axis=axis, **kwds ) return self.dst(a=a, out=out, type=itype, axis=axis, scale_by_size=s, **kwds)